-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrent Low-Code] ConcurrentDeclarativeSource class that low-code connectors can inherit from to uptake Concurrent CDK #46662
base: master
Are you sure you want to change the base?
Conversation
…n the concurrent framework
… and correctly merging intervals
…fix bugs found during functional testing
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
|
||
connector_state_converter = CustomOutputFormatConcurrentStreamStateConverter( | ||
datetime_format=declarative_cursor_attributes.datetime_format, | ||
is_sequential_state=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By disabling is_sequential_state
, we will automatically move connectors from their sequential state to per-partition concurrent state. This is the ideal end goal we want, however, if we were to revert the connector to a previous version that doesn't use the concurrent CDK, the state message would not be compatible.
It feels like to de-risk this a little bit on the early release, we should set this to true so we accept sequential state (i.e. {"updated_at": "2024-12-12"}
and also emit it back to the platform as such
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think we could add a mechanism to handle this situation by using the lowest value to set the sequential state if we revert the changes?
@@ -32,3 +33,6 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> | |||
The error object will be cast to string to display the problem to the user. | |||
""" | |||
return self.connection_checker.check_connection(self, logger, config) | |||
|
|||
def all_streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
return self.streams(config=config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non concurrent low-code sources, these are equivalent, but we override this implementation in ConcurrentDeclarativeSource
to create a single list of both concurrent and synchronous sources so that we properly generate catalogs and other things
cursor_field=[declarative_cursor_attributes.cursor_field] | ||
if declarative_cursor_attributes.cursor_field is not None | ||
else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cursor_field=[declarative_cursor_attributes.cursor_field] | |
if declarative_cursor_attributes.cursor_field is not None | |
else None, | |
cursor_field=( | |
[declarative_cursor_attributes.cursor_field] if declarative_cursor_attributes.cursor_field is not None else None | |
), |
catalog: ConfiguredAirbyteCatalog, | ||
concurrent_stream_names: set[str], | ||
) -> ConfiguredAirbyteCatalog: | ||
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ConfiguredAirbyteCatalog(streams=[stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names]) | |
catalog.streams = [stream for stream in catalog.streams if stream.stream.name not in concurrent_stream_names] | |
return catalog |
⚡️ Codeflash found optimizations for this PR📄
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got distracted, just posting this so comments don't get lost, will read thoroughly later
@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._parameters = parameters | |||
|
|||
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | |||
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface | |||
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Teach me your ways, what is the difference betweeen streams and all_streams? (ignore if it's in this PR, reading through it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, I mention it in a comment: https://github.com/airbytehq/airbyte/pull/46662/files#r1792816384
But the reason why we can't just rewrite the streams()
method is because within the existing Python CDK core.py
, when processing synchronous streams, we invoke the streams()
method and in that context we don't want to return the concurrent streams that aren't compatible in that are of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep this is addressed in my latest commit using the optional param include_concurrent_streams
@@ -67,6 +67,7 @@ def check_availability(self, logger: logging.Logger) -> StreamAvailability: | |||
""" | |||
|
|||
|
|||
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai probably better to call out that it should not be used at all, if we're ripping out availability strategies over mid-long term?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah i think i carried that over from a merge of serhii's work which deprecated availability strategy in concurrent. I'll update this to say do not use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the implementation looks great—nice work! However, we still need to make a few updates. After that I can approve.
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set 6 as a variable to avoid using magic numbers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes thank you for calling this out. I had put this in as a placeholder as I was working through getting this tested the first time around and this needs to be reinvestigated/fixed
@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._parameters = parameters | |||
|
|||
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | |||
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface | |||
streams = source.all_streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, it’s preferable to use the stream method and condition the behavior accordingly. This approach adds some complexity, but it provides a tradeoff by allowing simpler modifications later. With this setup, when the core will be able to handle concurrent streams, we’ll get a stream generation interface for free.
concurrency_level = concurrency_level_component.get_concurrency_level() | ||
initial_number_of_partitions_to_generate = concurrency_level // 2 | ||
else: | ||
concurrency_level = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to move the value from the code into a class variable may be to improve readability
def all_streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() | ||
|
||
def _separate_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think the name _group_streams
would be more informative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm fine with _group_streams
for slice_start, slice_end in self._cursor.generate_slices(): | ||
stream_slice = StreamSlice(partition={}, cursor_slice={"start": slice_start, "end": slice_end}) | ||
|
||
start_boundary = self._slice_boundary_fields[self._START_BOUNDARY] if self._slice_boundary_fields else "start" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current implementation of datetime cursor self._slice_boundary_fields
never has none value
from source_amplitude import SourceAmplitude | ||
|
||
|
||
def _get_source(args: List[str]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to mention in PR description clearly that this is will be a breaking change. And add info about this to cdk-migration file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep that is the plan, I wrote up a migration guide that will be included in the next commit I push explaining what needs to change in run.py
and source.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple minor comments. I'll defer to Serhii for approval.
Also can you run regression tests w/ one/two of the test connectors?
|
||
state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later | ||
|
||
self.logger.info(f"what is config: {config}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a personal debugging log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is. i'm removing this thank you
declarative_cursor.get_partition_field_end().eval(config=config), | ||
) | ||
|
||
interpolated_state_date = declarative_cursor.get_start_datetime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: interpolated_state_date
typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eyes!
end_boundary = self._slice_boundary_fields[self._END_BOUNDARY] if self._slice_boundary_fields else "end" | ||
|
||
wam = list(self._cursor.generate_slices()) | ||
for slice_start, slice_end in wam: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems like it would be more memory efficient to directly iterate over the generated slices, is there a specific reason for saving to a list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct. I had originally added this for debugging to see the entire set of slices easier, but you are correct this should be iterable
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this -- Is there somewhere we would be serializing a datetime object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll double check, but I think we needed this serializer deeper in our code, potentially in how we emit state back out. I'll reconfirm this as I work through @lazebnyi comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lazebnyi @pnilan @maxi297 to close the loop on this one. I think what originally happened was i wrote this in when i was first testing because we were getting the datetime object from the ConcurrentCursor and it would fail trying to serialize it.
However, later after cleaning up the code and fixing edge cases, I addressed the serialization by converting the datetime into the correct output string format with the correct precision in the generate() function here in https://github.com/airbytehq/airbyte/pull/46662/files#diff-93127bface0b323fe43b21cdb8fb14493dd465995b085a4f81647f3697930bddR396-R399 . And since this was now already a string we don't need to convert it again
I'll get rid of this code as it's not actually used anymore and we're applying the correct precision based on the cursor definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a first pass of review. I'm very happy with the so many edge cases you've caught during this.
I have concerns but all of them are opened to discussion and I'm willing to tackle some of them myself if it can help.
|
||
|
||
@dataclass | ||
class DeclarativeCursorAttributes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I have a concern that this only works for DatetimeBasedCursor. How do we envision the next steps for that class? Could we avoid this class by having _get_cursor(<...>) -> airbyte_cdk.sources.streams.concurrent.Cursor
instead of _get_cursor_attributes
? Or can we make this private?
@@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: | |||
self._parameters = parameters | |||
|
|||
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: | |||
streams = source.streams(config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface | |||
streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Regarding the # type: ignore
that was already there before your changes: Should we just make it part of the interface then? I'm not sure why ConnectionChecker.check
does not only take DeclarativeSource
then. It seems like even the typing for Source
is too large as streams
is defined on the AbstractStream
level. It feels like updating the typing issue, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposed temporary fix: https://github.com/airbytehq/airbyte/pull/46995/files#r1806474429
existing Python CDK. Streams that support concurrency are started from read(). | ||
""" | ||
if include_concurrent_streams: | ||
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is entirely true. For example, a caller of streams
could expect to call read_only_records
but this would not work in the case where AbstractStream
is returned.
My understanding is that we need this because we still rely on the AbstractSource for check
(all streams), discover
(all streams) and read
(only the non-concurrent streams).
Can we stop relying on streams
at all? Can we rely on the previous Stream
we already have to check availability and discovery until we do the switch to AbstractStream
entirely? The read
should also work with this because we filter the catalog in ConcurrentDeclarativeSource.read
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposed fix: #46995
|
||
if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(declarative_stream.retriever.requester, HttpRequester): | ||
http_requester = declarative_stream.retriever.requester | ||
if "stream_state" in http_requester._path.string: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice catch!
Maybe this is me being paranoid but should we have a log at warning in order for us to see this explicitly when we will have oncall issues? I would have one of each of the case we return False.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea. i'll add a log to each of these scenarios
@staticmethod | ||
def _stream_supports_concurrent_partition_processing(declarative_stream: DeclarativeStream, cursor: DatetimeBasedCursor) -> bool: | ||
""" | ||
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have documentation somewhere on things we need to do to fully migrate to the one CDK? If not, I can start one and this will be part of the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not have it documented at a high level as far as I am aware. However, in my writeup for the breaking change in cdk-migrations.md
, I've listed all the connectors in the repo that use stream_state
in a non-thread safe way and what we can do to fix them.
But at a higher level, we should have a line item for the unification of our CDKs. Basically we should update our schemas to not use stream_state
as an interpolation context and remove it wherever it's used from the Retriever and below (although I don't think it's actually used above that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take some time to improve previous work on documenting this here. Thanks for sharing this!
if not stream.supports_incremental: | ||
return None | ||
|
||
if isinstance(stream.retriever, SimpleRetriever): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not stream.get_cursor()
here? I fear that this will not work for AsyncRetriever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is just an oversight on my part, i've just got rid of this whole method and just call self.get_cursor() from above instead
return True | ||
|
||
@staticmethod | ||
def _get_cursor(stream: DeclarativeStream) -> Optional[DeclarativeCursor]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern I have with this method is that we are coupling the creation of the concurrent cursor with code that should eventually be removed i.e. we do (declarative -> low-code component -> concurrent) instead of (declarative -> concurrent). Once we will remove the low-code components, we will have to re-implement this logic. Should we instead rely on airbyte_cdk.sources.declarative.models.declarative_component_schema.DatetimeBasedCursor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a fair question. I think this can get a little bit awkward because we need to sort of extract the DatetimeBasedCursor model from the _source_config
field and we lose the automatic parsing into interpolated fields and other things that we basically need to reimplement. But it does feel like the model is the more solid interface that we won't change. I will play around with this tomorrow and see if this can be done sanely
|
||
# This needs to be revisited as we can't lose precision | ||
if isinstance(obj, datetime): | ||
return list(obj.timetuple())[0:6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this. From my understanding, this would mean that CursorPartitionGenerator would create slices with datetime within them but this is not what I see.
@@ -1,5 +1,94 @@ | |||
# CDK Migration Guide | |||
|
|||
## Upgrading to 6.x.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we validate with @bnchrch what is the impact on the manifest-only sources and what is the migration path for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, i'll add a note in the guide. But it sounds like source-declarative-manifest
is pinned to an explicit version so we can migrate on our own time. But we basically need to modify the respective run.py
and source.py
files the way we do individual connectors and then bump the version. And that in turn is actually the easiest way to release this to the most connectors since no manifest-only connectors define their own run.py
and as for connector builder, that one actually doesn't use YamlDeclarativeSource
it uses SourceDeclarativeManifest
which doesn't contain breaking changes. I think it's up for debate whether that should switch over, but for now it would be non breaking for there
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/9710
What
Adds the new
ConcurrentDeclarativeSource
class which serves as the way we adapt the existing YamlDeclarativeSource used by all low-code connectors into being runnable within the Concurrent CDK framework. This PR combines all the other previous units of work so that low-code streams are translated into concurrentDefaultStream
instances.Another big aspect of this review is how we gate the streams that will run concurrently:
How
The overall design is predicated on introducing a new class
ConcurrentDeclarativeSource
which behaves as a kind of adapter between the existing entrypoint.py that all syncs are triggered from and theConcurrentSource
which is responsible for running certain streams using the Concurrent CDK engine.The
ConcurrentDeclarativeSource
inherits the ManifestDeclarativeSource so that we can reuse the logic to parse a manifest into low-code runtime components and allow the inspection of it's components to decide whether it can be run concurrently or synchronously.The last big part of the code is this puts into place the logic to transform a low-code stream's
DatetimeBasedCursor
into aConcurrentCursor
. The reason why we need to do this is that the interfaces for a low-code cursor and concurrent cursor differ in a few specific ways and trying to make them both fit the same interface created a frankenstein class that proved to be even more unwieldly. In prior PRs, see 45413, it was determined that there was feature parity so now we perform the transformation and supply it to the concurrent engine to handle date window partitioning and state management.Something else important to note is that there are some specific cases where an incremental stream cannot be run as a concurrent source. Since we introduced the language, we've allow
stream_state
to be a valid interpolation for various components. However, because partitions can be run in any order and complete at anytime,stream_state
managed by the ConcurrentCursor is not a thread-safe value anymore (vs when it was managed sequentially). I inspected the schema and our repo for it's usage. For streams usingstream_state
in an unsafe way, we make it a synchronous stream, but we should fix those connectors to use the thread safestream_interval
and ultimately get rid of the extra code later.Short term how we enable this
I've included two examples of how connectors can uptake concurrent processing. They are the same and will be deleted before merging.
The two things that need to be changed are:
run.py
- Our previous design for connectors did not take any arguments passed to the connector from the platform. This is a significant limitation because the concurrent framework is entirely based around instantiating things like cursors up front before performing a read. I haven't found a great way to avoid changing this as this is a limitation of the Concurrent CDKsource.py
- Oncerun.py
is updated to pass in the various operation arguments like state, config, and catalog, we need to pass them to theConcurrentDeclarativeSource
constructorReview guide
concurrent_declarative_source.py
datetime_based_cursor.py
adapter.py
datetime_stream_state_converter.py
yaml_declarative_source.py
source-sentry
orsource-amplitude
User Impact
This is considered a breaking CDK change because connectors will need to follow the included migration guide to update a connectors
run.py
andsource.py
filesCan this PR be safely reverted and rolled back?
Yes, because this isn't release yet. However, this does pose a risk once we move a connector to concurrent because once we start emitting the new state format, then it is much harder to go backward since the connector cannot process concurrent state. See my comment in the code for more